ActiveMq

您所在的位置:网站首页 activemq 最新版本 ActiveMq

ActiveMq

#ActiveMq| 来源: 网络整理| 查看: 265

安装

下载页面 image.png 选择对应的版本,推荐15,如果使用17需要jdk11 image.png 启动文件路径 此时可能存在端口占用报错,和我之前的rabbitmq 的 erl 冲突了,启动成功以后,访问 http://localhost:8161/admin/ 账号密码都是 admin image.png

native方式调用

image.png

org.apache.activemq activemq-client 5.15.15

注意版本对应 producer

public class ActiveMQProducer { private static final String BROKER_URL = "tcp://127.0.0.1:61616"; private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String QUEUE_NAME = "queue_demo"; public static void main(String[] args) throws JMSException { // 创建连接 Connection connection = getConnection(); // 创建会话 Session session = getSession(connection); // 创建队列 Queue queue = getQueue(session); // 创建 Producer MessageProducer producer = session.createProducer(queue); // 发送三条消息 for (int i = 0; i ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); Connection connection = factory.createConnection(); // 启动连接 connection.start(); return connection; } public static Session getSession(Connection connection) throws JMSException { // 第一个参数 tranceted,是否开启事务。这里设置为 false,无需开启 // 第二个参数 AUTO_ACKNOWLEDGE 确认模式这里设置为 自动确认 return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public static Queue getQueue(Session session) throws JMSException { // 创建目的地 (两种 : 队列/主题 这里用队列) return session.createQueue(QUEUE_NAME); } }

activemq 有两种消息模式,对上图上的 destination

点对点:queue 消息不丢失 有状态发布订阅:topic 消息会丢失 广播

consumer

public class ActiveMQConsumer { public static void main(String[] args) throws JMSException { Connection connection = ActiveMQProducer.getConnection(); Session session = ActiveMQProducer.getSession(connection); Queue queue = ActiveMQProducer.getQueue(session); // 创建 consumer consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println(String.format("[线程:%s][消息编号:%s][消息内容:%s]", Thread.currentThread(), textMessage.getJMSMessageID(), textMessage.getText())); } catch (JMSException e) { throw new RuntimeException(e); } } } }); // 关闭 try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException ignore) { } session.close(); connection.close(); } }

image.png 报错通过添加依赖可解决

org.slf4j slf4j-simple 1.7.25 compile SpringBoot中使用

可参考 Spring 实战中的实现,这里用的版本 2.7.8

org.springframework.boot spring-boot-starter-parent 2.7.8 org.springframework.boot spring-boot-starter-activemq org.springframework.boot spring-boot-starter-test test org.projectlombok lombok

配置下 全局的connection(注意如果使用spring提供给我们的 jmsTemplate,这里配置只能有一个,无法做到连接多个mq,以及配置不同的消息策略,后面会讲到如何扩展,自定义注入 connectionFactory)

spring: activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: admin packages: trust-all: true # 可信任的反序列化包 没有配置的话无法序列化 @Data public class Demo01Message implements Serializable { public static final String QUEUE = "QUEUE_DEMO_01"; /** * 编号 */ private Integer id; } @Component public class BroadcastProducer { @Resource(name = ActiveMQConfig.BROADCAST_JMS_TEMPLATE_BEAN_NAME) private JmsMessagingTemplate jmsMessagingTemplate; public void syncSend(Integer id) { // 创建 BroadcastMessage 消息 BroadcastMessage message = new BroadcastMessage(); message.setId(id); // 同步发送消息 jmsMessagingTemplate.convertAndSend(BroadcastMessage.TOPIC, message); } } // Demo01Consumer.java @Component public class Demo01Consumer { private Logger logger = LoggerFactory.getLogger(getClass()); @JmsListener(destination = Demo01Message.QUEUE) public void onMessage(Demo01Message message) { logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message); } }

测试类

@SpringBootTest public class Demo01ProducerTest { @Autowired private Demo01Producer producer; @Test public void testSyncSend() throws InterruptedException { // 发送消息 int id = (int) (System.currentTimeMillis() / 1000); producer.syncSend(id); logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id); // 阻塞等待,保证消费 new CountDownLatch(1).await(); } } 延迟队列实现 配置 activeMq 使其支持延时消息

ActvieMQ 5.4版本内置可选持久化定时任务消息发送,但是必须设置schedulerSupport属性为True 即:

如何实现修改消息属性 @Autowired JmsTemplate jmsTemplate; public void syncSend(Integer id) { // 创建 Demo01MNessage 消息 Demo01Message demo01Message = new Demo01Message(); demo01Message.setId(id); // 延时10s jmsTemplate.convertAndSend(Demo01Message.QUEUE, demo01Message, message -> { long time = 10 * 1000; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); return message; }); }

可参考这里的写法 convertAndSend(String destinationName, final Object message, final MessagePostProcessor postProcessor) 其中 MessagePostProcessor 是一个函数式接口

@FunctionalInterface public interface MessagePostProcessor { Message postProcessMessage(Message message) throws JMSException; } 延时参数设置 属性名称类型描述AMQ_SCHEDULED_DELAYlong消息延迟时间单位:毫秒AMQ_SCHEDULED_PERIODlong消息发送周期单位时间:毫秒。如 5秒一次 配置 AMQ_SCHEDULED_PERIOD = 5*1000AMQ_SCHEDULED_REPEATint消息重复发送次数AMQ_SCHEDULED_CRONstring使用Cron 表达式 设置定时发送

这几个参数都可以通过在 MessagePostProcessor 设置 message 属性实现 给两个比较有代表性的例子

1. 开始延迟30秒发送,重复发送10次,每次之间间隔10秒 long delay = 30 * 1000; long period = 10 * 1000; int repeat = 9; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); 2.使用Cron 表示式定时发送消息 message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); TODO 不同消息模式的实现:测试集群消费和广播消费ack 与 可靠性投递MessageConverter 消息转换器配置序列化方式


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3